//AvenirMQ核心模块
const net = require('net');
const { getError, toLog, SUCCESS, AvenirMQ_ALL } = require('../common/common');
const User = require('./User');
const moment = require('moment');
const libcu = require('libcu');
const { EOL } = require('os');
const { succCode, succMessage } = require('../common/constants');
class Core {
    constructor() {
        this.user = new User(ini);
        this.signPool = {
            test: {
                name: 'test',
            },
        };
        this.messages = [];     //目前的消息队列
        this.connPoll = {};     //连接池
        this.gcMessages = [];
        //消息重发 处理死信的逻辑
        setInterval(async () => {
            await this.reSend();
        }, ((ini.mq.MQResend <= 50 && ini.mq.MQResend >= 2) ? ini.mq.MQResend : 10) * 1000);

        // //测试用代码，输出消息队列和gc队列
        // setInterval(async() => {
        //     console.log(EOL);
        //     console.log("this.messages = ",JSON.stringify(this.messages));
        //     console.log("this.gcMessages = ",JSON.stringify(this.gcMessages));
        // }, 2000);
        setInterval(async () => {
            //必须await 不然可能会脏读
            await this.autoClear();

        }, ((ini.mq.loopTime >= 10 && ini.mq.loopTime <= 120) ? ini.mq.loopTime : 10) * 1000);
    }
    async init(ip, port) {
        this.server = new net.createServer();
        this.server.on('connection', async (client) => {

            client.on('data', async (msg) => { //接收client发来的信息
                let info = client.address();   //{ address: '127.0.0.1', family: 'IPv4', port: 44944 } 可以获取客户端的ip地址
                let get = null;
                try {
                    get = JSON.parse(msg.toString());

                } catch (error) {
                    throw ('BAD_REQUEST');
                }
                // resValue = iconv.decode(msg,'utf8');
                // 在这里校验用户名和密码
                let type = get.type;      //与客户端预先定义接口
                toLog("服务端收到的数据为 ", get);

                try {
                    if (!get.data) {
                        throw ("BAD_COMMAND");
                    }
                    switch (type) {
                        case 'login':
                            {
                                //先判断是否为合法用户 随后解析协议
                                let sign = await this.user.login(get.data);
                                await this.add2ConnectPool(get.data, sign, client);
                                break;
                            }
                        //操作用户
                        case 'addUser':
                            this.user.addUser(get.data);
                            break;
                        case 'deleteUser':
                            this.user.delUser(get.data);
                            break;
                        case 'updateUser':
                            this.user.updateUser(get.data);
                            break;
                        case 'userList':
                            this.user.showUserLists();
                        //发送数据
                        case 'send':
                            //send把整个数据都传进去
                            await this.store(get);
                            break;
                        //更新用户绑定的key 很重要的功能
                        case 'setKey':
                            await this.setKey(get.data);
                            break;
                        default:
                            await this.response('BAD_COMMAND', client);
                            break;
                    }
                } catch (err) {
                    await this.response(err, client);
                }
            });

            client.on('error', function (e) { //监听客户端异常
                toLog('client error:' + e);
                client.end();
            });

            client.on('close', function () {
                toLog(`客户端下线了`);
            });

            client.setTimeout(parseInt(ini.mq.timeOut)); //从配置文件读取每个连接的超时时间
            client.on('timeout', () => {
                toLog('客户端超时了');
                client.end();
            });
        });
        this.server.listen(port, ip, function () {
            log.debug(`AvenirMQ运行在：${ip}:${port}`);
        });
    }


    //返回数据给客户端 兼容报错和正常的返回
    async response(type, client) {
        let res = null;
        if (typeof type == 'string') {
            res = getError(type);
            if (!res) {
                res = unknown;
            }
        } else {
            let code = type.code;
            let data = type.data;
            res = getError(code);
            if (!res) {
                res = unknown;
            }
            res.data = data;
        }
        client.write(JSON.stringify(res));
        //如果没有配置默认短连接

        if (ini.mq.keepAlive != true) {
            toLog("主动踢掉客户端的连接");
            client.end();
        }
    }

    //存放要发送的数据
    async store(data) {
        let sign = data.sign;
        if (!this.signPool[sign]) {
            throw ('INVALID_SIGN');
        }
        let name = this.signPool[sign].name;
        // 20210120 更新连接池的使用时间
        this.signPool[sign].createTime = moment().valueOf();
        toLog("name = ", name);
        toLog("this.user.userList = ", this.user.userList);
        let keys = this.user.userList[name].key;
        toLog("键值为 ", keys);
        //顺便把数据放进去
        keys.data = data.data;
        keys.sign = sign;
        //20210123 增加创建时间信息
        keys.createTime = moment().valueOf();
        this.messages.push(keys);
        try {
            await this.AvenirMQSend(this.messages);
        } catch (error) {
            toLog("AvenirMQSend主动发送消息失败 error->", error);
        }
        throw (SUCCESS);
    }

    //触发类函数 调用之后就开始发送消息
    async AvenirMQSend(msg, type) {
        //20210110先写个简单版的 不用promise.all发送消息
        for (let i = 0; i < msg.length; i++) {
            try {
                let sub = null;
                let gcInfo = {};        //gc的时候取到要发送的接收方信息
                if (type == 'gc') {
                    let temp = msg[i];
                    let count = temp.count;
                    //超过重发时间就删除这条消息
                    if (count >= ini.mq.retryTime) {
                        toLog("删除了gc队列的死信:", temp);
                        this.gcMessages.splice(i, 1);
                        i--;
                        continue;
                    }
                    sub = temp.msg;
                    gcInfo = temp.info;
                } else {
                    sub = msg[i];
                }
                toLog("sub = ", sub);
                toLog("this.signPool = ", this.signPool);
                let text = {
                    sender: sub.send,
                    data: sub.data,
                }
                let bind = this.user.binds[sub.to];
                if (!bind || !bind.length) {
                    //说明没有这样的消息类型
                    continue;
                }
                toLog("bind = ", bind);
                //20210117如果是gc的就直接直接按照msg存的info发消息吧
                if (type != 'gc') {
                    for (let j = 0; j < bind.length; j++) {
                        //20210116增加对类型的判断
                        if ((bind[j].type === sub.type || bind[j].type === AvenirMQ_ALL)
                            && (bind[j].receive === sub.to || bind[j].receive === AvenirMQ_ALL)) {
                            //说明这是要发送的消息
                            let info = {
                                ip: bind[j].ip,
                                port: bind[j].port,
                            };
                            let conn = null;

                            if (this.connPoll[bind[j].ip] && this.connPoll[bind[j].ip][bind[j].port]) {
                                conn = this.connPoll[bind[j].ip][bind[j].port].conn;
                                // 20210120 更新创建时间
                                this.connPoll[bind[j].ip][bind[j].port].createTime = moment().valueOf();
                            }
                            let newSub = JSON.parse(JSON.stringify(sub));
                            await this.send(text, conn, info, newSub, type, i);
                        } else {
                            //20210122 当j遍历到最后面再输出啊……
                            if (j === bind.length - 1) {
                                toLog("存在无人接收的信息", msg[i]);
                            }
                        }
                    }
                } else {
                    let conn = null;
                    if (this.connPoll[gcInfo.ip] && this.connPoll[gcInfo.ip][gcInfo.port]) {
                        conn = this.connPoll[gcInfo.ip][gcInfo.port].conn;
                        this.connPoll[gcInfo.ip][gcInfo.port].createTime = moment().valueOf();
                    }
                    await this.send(text, conn, gcInfo, sub, type, i);
                }

                if (type == 'gc') {
                    this.gcMessages.splice(i, 1);
                    toLog("gc队列清除本次消息");
                } else {
                    this.messages.splice(i, 1);
                }

                toLog("发送完消息的message为 ", this.messages);
            } catch (error) {
                toLog("AvenirMQSend error->", error);
            }

        }
    }

    //封装的发送函数
    async send(json, conn, info, msg, type, index) {
        //20210119 增加返回值中的code和message
        json.code = succCode;
        json.message = succMessage;
        let promise = new Promise((resolve, reject) => {
            try {
                if (!conn) {
                    throw ("连接池中不存在该连接");
                }
                conn.write(JSON.stringify(json));
                toLog("通过连接池发消息成功");
                resolve(SUCCESS);
            } catch (error) {
                try {
                    toLog("连接写数据报错了,error->", error);
                    //20210114 如果是传了连接但是发消息失败 剔除这个连接信息，下次就会重连了
                    if (conn) {
                        delete this.connPoll[info.ip][info.port].conn;
                    }

                    let client = new net.Socket();  //后期改缓存连接
                    client.connect({
                        host: info.ip,
                        port: info.port,
                    })

                    //客户端与服务器建立连接触发
                    client.on('connect', () => {
                        if (!this.connPoll[info.ip]) {
                            this.connPoll[info.ip] = {};
                        }
                        this.connPoll[info.ip][info.port] = {

                            conn: client,
                            createTime: moment().valueOf()

                        }

                        client.write(JSON.stringify(json));
                    });

                    //客户端接收数据触发
                    client.on('data', (data) => {
                        // toLog("AvenirMQ收到应答 ", libcu.tools.safeJsonParse(data));
                        toLog("AvenirMQ收到应答 ", data.toString());
                        if (!this.connPoll[info.ip]) {
                            this.connPoll[info.ip] = {};
                        }
                        this.connPoll[info.ip][info.port] = {

                            conn: client,
                            createTime: moment().valueOf()

                        }

                        resolve(data.toString());
                    });

                    client.on('error', (error) => {
                        toLog("AvenirMQ发消息报错,error:", error);
                        //20210116 如果是连接池发消息报错的，需要将这个连接删除! 否则永远不会恢复正常了

                        if (conn && this.connPoll[info.ip] && this.connPoll[info.ip][info.port]) {
                            delete this.connPoll[info.ip][info.port];
                        }

                        //20210114把错误的消息存起来 下次再发送
                        if (type != 'gc') {
                            toLog("插入gc队列的消息为 ", msg);
                            let pushed = {
                                msg: msg,
                                count: 0,
                                info: info,
                            }
                            this.gcMessages.push(pushed);
                            this.messages.splice(index, 1);
                            toLog("目前gc的队列为 ", JSON.stringify(this.gcMessages));
                        } else {
                            if (this.gcMessages[index]) {
                                this.gcMessages[index].count++;
                                toLog("增加了重试的计数:", this.gcMessages[index].count);
                            } else {
                                reject('未找到该消息 已被回收');
                            }

                        }
                        reject(error);
                    })

                    client.on('timeout', (error) => {
                        toLog("AvenirMQ主动发送消息超时,error->", error);
                        reject('timeout');
                    })
                } catch (error) {
                    toLog("last catch error->", error);
                    reject(error);
                }

            }
        })
        return promise;
    }

    async add2ConnectPool(data, sign, client) {
        this.signPool[sign] = {
            conn: client,
            name: data.name,
            createTime: moment().valueOf(),
        };
        throw ({ code: SUCCESS, data: sign });
    }

    async reSend() {
        await this.AvenirMQSend(this.gcMessages, 'gc');
        // for(let i=0;i<this.gcMessages.length;i++) {

        // }
    }

    //修改用户绑定的key
    async setKey(data) {
        let name = data.name;
        let key = data.key;
        if (!this.user.userList[name]) {
            throw ('USER_NOW_FOUND');
        }
        let parsed = this.user.parseKey(key);
        this.user.userList[name].key = parsed;
        //刷新
        this.user.refresh(true);
        throw (SUCCESS);
    }

    async autoClear() {
        await Promise.all([this.clearConn(), this.clearSign()],this.clearMsgList(),this.clearGcList());
    }

    async clearConn() {
        let diffTime = ini.mq.connClearTime > 0 ? ini.mq.connClearTime : 120;
        let now = moment().valueOf();
        for (let ip in this.connPoll) {
            let ports = this.connPoll[ip];
            for (let port in ports) {
                let conns = ports[port];
                if (moment(now).diff(conns.createTime, 'seconds') > diffTime) {
                    try {
                        conns.conn.end();
                    } catch (error) {
                        toLog("自动关闭连接失败 ,error->", error);
                    }
                    toLog("自动清除了连接 ports[port] = ", ports[port]);
                    delete ports[port];
                }
            }
        }
        return;
    }

    async clearSign() {
        let diffTime = ini.mq.signClearTime > 0 ? ini.mq.signClearTime : 120;
        let now = moment().valueOf();
        for (let sign in this.signPool) {
            let signs = this.signPool[sign];
            if (moment(now).diff(signs.createTime, 'seconds') > diffTime) {
                toLog("自动清理了签名", this.signPool[sign]);
                delete this.signPool[sign];
            }

        }
        return;
    }

    async clearMsgList() {
        if (!ini.mq.autoClear || ini.mq.autoClear <= 0) {
            return;
        }
        let diffTime = (ini.mq.autoClear > 120 || ini.mq.autoClear < 300) ? ini.mq.autoClear : 120;
        for(let i=0;i<this.messages.length;i++) {
            let msg = this.messages[i];
            if(moment(now).diff(msg.createTime,'seconds') > diffTime) {
                this.messages.splice(i,1);
                i--;
            }
        }
    }

    async clearGcList() {
        if (!ini.mq.autoClear || ini.mq.autoClear <= 0) {
            return;
        }
        let diffTime = (ini.mq.autoClear > 120 || ini.mq.autoClear < 300) ? ini.mq.autoClear : 120;
        for(let i=0;i<this.gcMessages.length;i++) {
            let msg = this.gcMessages[i];
            if(moment(now).diff(msg.createTime,'seconds') > diffTime) {
                this.gcMessages.splice(i,1);
                i--;
            }
        }
    }
}

module.exports = Core;